/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *	  http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.sirona.store.gauge;

import org.apache.sirona.Role;
import org.apache.sirona.SironaException;
import org.apache.sirona.configuration.Configuration;

import java.lang.reflect.Constructor;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;

public class DelegatedCollectorGaugeDataStore implements CollectorGaugeDataStore {
	private final ConcurrentMap<String, GaugeDataStore> dataStores = new ConcurrentHashMap<String, GaugeDataStore>();

	private final Class<? extends GaugeDataStore> delegateClass;

	public DelegatedCollectorGaugeDataStore() {
		try {
			delegateClass = Class.class.cast(
				DelegatedCollectorGaugeDataStore.class.getClassLoader().loadClass( // use this classloader and not TCCL to avoid issues
					Configuration.getProperty(Configuration.CONFIG_PROPERTY_PREFIX + "collector.gauge.store-class", InMemoryGaugeDataStore.class.getName())));
		} catch (final ClassNotFoundException e) {
			throw new SironaException(e.getMessage(),e);
		}
	}

	protected GaugeDataStore newStore(final String marker) {
		try {
			try {
				final Constructor<? extends GaugeDataStore> cons = delegateClass.getConstructor(String.class);
				return cons.newInstance(marker);
			} catch (final Exception e) {
				// no-op: use default constructor
			}
			return delegateClass.newInstance();
		} catch (final Exception e) {
			throw new SironaException(e);
		}
	}

	@Override
	public SortedMap<Long, Double> getGaugeValues(final GaugeValuesRequest gaugeValuesRequest, final String marker) {
		final GaugeDataStore gaugeDataStore = dataStores.get(marker);
		if (gaugeDataStore == null) {
			return new TreeMap<Long, Double>();
		}
		return gaugeDataStore.getGaugeValues(gaugeValuesRequest);
	}

	@Override
	public void createOrNoopGauge(final Role role, final String marker) {
		GaugeDataStore gaugeDataStore = dataStores.get(marker);
		if (gaugeDataStore == null) {
			gaugeDataStore = newStore(marker);
			final GaugeDataStore existing = dataStores.putIfAbsent(marker, gaugeDataStore);
			if (existing != null) {
				gaugeDataStore = existing;
			}
		}
		gaugeDataStore.createOrNoopGauge(role);
	}

	@Override
	public void addToGauge(final Role role, final long time, final double value, final String marker) {
		createOrNoopGauge(role, marker); // this implementation doesn't mandates createOrNoopGauge call
		dataStores.get(marker).addToGauge(role, time, value);
	}

	@Override
	public Collection<String> markers() {
		return dataStores.keySet();
	}

	@Override // TODO: see if using a period to aggregate data wouldn't make more sense
	public SortedMap<Long, Double> getGaugeValues(final GaugeValuesRequest gaugeValuesRequest) {
		final SortedMap<Long, Double> values = new TreeMap<Long, Double>();

		for (final Map.Entry<String, GaugeDataStore> marker : dataStores.entrySet()) {
			final Map<Long, Double> gaugeValues = marker.getValue().getGaugeValues(gaugeValuesRequest);
			for (final Map.Entry<Long, Double> entry : gaugeValues.entrySet()) {
				final Long key = entry.getKey();
				final Double value = values.get(key);
				final Double thisValue = entry.getValue();
				if (value == null) {
					values.put(key, thisValue);
				} else {
					values.put(key, value + thisValue);
				}
			}
		}

		return values;
	}

	@Override
	public Collection<Role> gauges() {
		final Set<Role> roles = new HashSet<Role>();
		for (final GaugeDataStore store : dataStores.values()) {
			roles.addAll(store.gauges());
		}
		return roles;
	}

	@Override
	public Role findGaugeRole(final String name) {
		for (final GaugeDataStore store : dataStores.values()) {
			final Role role = store.findGaugeRole(name);
			if (role != null) {
				return role;
			}
		}
		return null;
	}

	@Override
	public void gaugeStopped(final Role gauge) {
		for (final GaugeDataStore store : dataStores.values()) {
			store.gaugeStopped(gauge);
		}
	}

	public void reset() {
		dataStores.clear();
	}
}
